import asyncio
import signal
from http import HTTPStatus
from typing import Any

import uvicorn
from fastapi import FastAPI, Request, Response
from loguru import logger

import aphrodite.common.envs as envs
from aphrodite.common.utils import find_process_using_port, in_windows
from aphrodite.engine.async_aphrodite import AsyncEngineDeadError
from aphrodite.engine.multiprocessing import MQEngineDeadError

APHRODITE_KEEP_ALIVE_ON_ENGINE_DEATH = (
    envs.APHRODITE_KEEP_ALIVE_ON_ENGINE_DEATH)


async def serve_http(app: FastAPI, **uvicorn_kwargs: Any):

    config = uvicorn.Config(app, **uvicorn_kwargs)
    server = uvicorn.Server(config)
    _add_shutdown_handlers(app, server)

    loop = asyncio.get_running_loop()

    server_task = loop.create_task(server.serve())

    def signal_handler() -> None:
        # prevents the uvicorn signal handler to exit early
        server_task.cancel()

    async def dummy_shutdown() -> None:
        pass

    if in_windows():
        # Windows - use signal.signal() directly
        signal.signal(signal.SIGINT, lambda signum, frame: signal_handler())
        signal.signal(signal.SIGTERM, lambda signum, frame: signal_handler())
    else:
        # Unix - use asyncio's add_signal_handler
        loop.add_signal_handler(signal.SIGINT, signal_handler)
        loop.add_signal_handler(signal.SIGTERM, signal_handler)

    try:
        await server_task
        return dummy_shutdown()
    except asyncio.CancelledError:
        port = uvicorn_kwargs["port"]
        process = find_process_using_port(port)
        if process is not None:
            logger.info(
                f"port {port} is used by process {process} launched with "
                f"command:\n{' '.join(process.cmdline())}")
        logger.info("Shutting down FastAPI HTTP server.")
        return server.shutdown()


def _add_shutdown_handlers(app: FastAPI, server: uvicorn.Server) -> None:
    """Adds handlers for fatal errors that should crash the server"""

    @app.exception_handler(RuntimeError)
    async def runtime_error_handler(request: Request, __):
        """On generic runtime error, check to see if the engine has died.
        It probably has, in which case the server will no longer be able to
        handle requests. Trigger a graceful shutdown with a SIGTERM."""
        engine = request.app.state.engine_client
        if (not APHRODITE_KEEP_ALIVE_ON_ENGINE_DEATH and engine.errored
                and not engine.is_running):
            logger.error("AsyncAphrodite has failed, terminating server "
                         "process")
            # See discussions here on shutting down a uvicorn server
            # https://github.com/encode/uvicorn/discussions/1103
            # In this case we cannot await the server shutdown here because
            # this handler must first return to close the connection for
            # this request.
            server.should_exit = True

        return Response(status_code=HTTPStatus.INTERNAL_SERVER_ERROR)

    @app.exception_handler(AsyncEngineDeadError)
    async def async_engine_dead_handler(_, __):
        """Kill the server if the async engine is already dead. It will
        not handle any further requests."""
        if not APHRODITE_KEEP_ALIVE_ON_ENGINE_DEATH:
            logger.error("AsyncAphrodite is already dead, terminating server "
                         "process")
            server.should_exit = True

        return Response(status_code=HTTPStatus.INTERNAL_SERVER_ERROR)


    @app.exception_handler(MQEngineDeadError)
    async def mq_engine_dead_handler(_, __):
        """Kill the server if the mq engine is already dead. It will
        not handle any further requests."""
        if not envs.APHRODITE_KEEP_ALIVE_ON_ENGINE_DEATH:
            logger.error("MQLLMEngine is already dead, terminating server "
                         "process")
            server.should_exit = True
        return Response(status_code=HTTPStatus.INTERNAL_SERVER_ERROR)
